In [1]:
!ls -l $SPARK_HOME
In [2]:
# Note: set SPARK_HOME to Spark binaries before launching the Jupyter session.
import os, sys
SPARK_HOME = os.environ['SPARK_HOME']
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.4-src.zip"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print("Spark version: ", spark.version)
In [3]:
spark.sparkContext.uiWebUrl
Out[3]:
In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import evaluation
from pyspark.sql.functions import *
import pandas as pd
import pyspark
import numpy as np
In [5]:
pd.__version__, np.__version__,pyspark.__version__
Out[5]:
Check version of the libraries. For this notebook, I am using Spark 2.2.0
You can download the dataset from here
In [6]:
credit = spark.read.options(header = True, inferSchema = True).csv("/data/credit-default.csv").cache()
print("Total number of records: ", credit.count())
credit.limit(10).toPandas().head()
# Taking 10 samples records from spark dtaframe into a Pandas dataframe to display the values
# I prefer the pandas dataframe display to that by spark dataframe show function.
Out[6]:
View the schema
In [7]:
credit.printSchema()
As I can see, there are number of columns of string type - checking_balance, credit_history etc.
Let me define a function that take a catgorical column and pass it through StringIndexer and OneHotEncoder it gives back a dataframe with same column name as the original categorical column. It reurns a new dataframe that contains categorical column replaced by OneHotEncoded vector.
Find all columns of String datatype
Transform each string column type into OneHotEncoded value and collect distinct values for each categorical column in list as shown below.
In [8]:
cols = credit.columns
cols.remove("default")
cols
Out[8]:
In [9]:
from pyspark.ml import Model, Estimator
class DFOneHotEncoderModel(Model):
def get_col_labels(self):
cols = []
feature_columns = [c for c in self.columns if not c == self.label_column]
for col in feature_columns:
if col in self.categorical_fields:
string_indexer, _ = self.categorical_fields[col]
values = string_indexer.labels
values = values[:-1] if self.drop_last else values
values = [col + "_" + v for v in values]
cols.extend(values)
else:
cols.append(col)
return cols
def transform(self, df, params= None):
for colname in self.categorical_fields:
string_indexer, one_hot_encoder = self.categorical_fields[colname]
df = string_indexer.transform(df)
df = df.drop(colname)
df = df.withColumnRenamed(colname + "_idx", colname)
if one_hot_encoder:
df = one_hot_encoder.transform(df)
df = df.drop(colname)
df = df.withColumnRenamed(colname + "_ohe", colname)
return df
class DFOneHotEncoder(Estimator):
def __init__(self, label_column, categorical_fields= None, one_hot = True, drop_last = True):
self.categorical_fields = None
self.one_hot = one_hot
self.drop_last = drop_last
self.label_column = label_column
if not categorical_fields is None:
self.categorical_fields = dict([(c, None) for c in categorical_fields])
def fit(self, df):
cols = df.columns
if self.categorical_fields is None:
self.categorical_fields = dict([(col, None) for col, dtype in df.dtypes if dtype == "string"])
for colname in self.categorical_fields:
string_indexer = StringIndexer(inputCol=colname, outputCol= colname + "_idx").fit(df)
one_hot_encoder = None
if self.one_hot:
one_hot_encoder = OneHotEncoder(inputCol=colname
, outputCol=colname + "_ohe" , dropLast = self.drop_last)
self.categorical_fields[colname] = (string_indexer, one_hot_encoder)
model = DFOneHotEncoderModel()
model.categorical_fields = self.categorical_fields
model.one_hot = self.one_hot
model.drop_last = self.drop_last
model.columns = cols
model.label_column = self.label_column
return model
In [10]:
model = DFOneHotEncoder(label_column = "default").fit(credit)
df = model.transform(credit)
print(df.dtypes)
print("\n")
print(model.get_col_labels())
Verify that all columns in df is either of numeric or numeric vector type
In [11]:
df.printSchema()
Create a list of columns except the label column
Use a vector assembler to transform all features into a single feature column
In [12]:
df_vect = VectorAssembler(inputCols = cols, outputCol="features").transform(df)
df_vect.select("features", "default").limit(5).toPandas()
Out[12]:
Let me spot check whether OneHotEncode worked ok.
In [13]:
credit.first()
Out[13]:
In [14]:
pd.DataFrame({"feature": model.get_col_labels(), "value": df_vect.select("features").first().features})
Out[14]:
In [15]:
df_train, df_test = df_vect.randomSplit(weights=[0.7, 0.3], seed=1)
df_train.count(), df_test.count()
Out[15]:
In [16]:
forest = RandomForestClassifier(labelCol="default", featuresCol="features", seed = 123)
forest_model = forest.fit(df_train)
In [17]:
df_test_pred = forest_model.transform(df_test)
df_test_pred.show(5)
In [18]:
df_test_pred.groupBy("default").pivot("prediction").count().show()
In [19]:
evaluator = evaluation.MulticlassClassificationEvaluator(labelCol="default",
metricName="accuracy", predictionCol="prediction")
evaluator.evaluate(df_test_pred)
Out[19]:
In [20]:
print("Total number of features: ", forest_model.numFeatures, "\nOrder of feature importance: \n")
pd.DataFrame({"importance": forest_model.featureImportances.toArray(),
"feature": model.get_col_labels()
}).sort_values("importance", ascending = False)
Out[20]:
In [21]:
from pyspark.ml.pipeline import Pipeline, PipelineModel
In [23]:
credit = spark.read.options(header = True, inferSchema = True).csv("/data/credit-default.csv").cache()
label_col = "default"
feature_cols = credit.columns
feature_cols.remove(label_col)
df_train, df_test = credit.randomSplit(weights=[0.7, 0.3], seed=1)
pipeline = Pipeline()
print(pipeline.explainParams())
encoder = DFOneHotEncoder(label_column = label_col)
vectorizer = VectorAssembler(inputCols = feature_cols, outputCol="features")
forest = RandomForestClassifier(labelCol="default", featuresCol="features", seed = 123)
pipeline.setStages([encoder, vectorizer, forest])
pipelineModel = pipeline.fit(df_train)
df_test_pred = pipelineModel.transform(df_test)
evaluator = evaluation.MulticlassClassificationEvaluator(labelCol="default",
metricName="accuracy", predictionCol="prediction")
accuracy = evaluator.evaluate(df_test_pred)
print("Accuracy", accuracy)
In [ ]: